/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hive.ql.exec;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Base operator implementation.
 *
 * 每个步骤对应一个逻辑运算符(Operator)
 * 每个Operator输出一个虚表(VirtualTable)
 *
 * RowSchema表示Operator的输出字段
 * InputObjInspector outputObjInspector解析输入和输出字段
 * processOp接收父 Operator 传递的数据，forward 将处理好的数据传递给子Operator处理
 * Hive每一行数据经过一个Operator处理之后，会对字段重新编号，colExprMap记录每个表达式经过当前Operator处理前后的名称对应关系，在下一个阶段逻辑优化阶段用来回溯字段名
 * 由于Hive的MapReduce程序是一个动态的程序，即不确定一个MapReduce
 * Job会进行什么运算，可能是Join，也可能是GroupBy，所以Operator将所有运行时需要的参数保存在OperatorDesc中，OperatorDesc在提交任务前序列化到HDFS上，在MapReduce
 * 任务执行前从HDFS读取并反序列化。Map阶段OperatorTree在HDFS上的位置在Job.getConf(“hive.exec.plan”) + “/map.xml”
 *
 * 常见实现类：
 * GroupByOperator
 * UnionOperator
 * UDTFOperator
 * TableScanOperator
 * FilterOperator
 * SelectOperator
 * LimitOperator
 **/
public abstract class Operator<T extends OperatorDesc> implements Serializable, Cloneable, Node {

    // Bean methods
    private static final long serialVersionUID = 1L;

    public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
    public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
    public static final String CONTEXT_NAME_KEY = "__hive.context.name";

    private transient Configuration configuration;
    protected transient CompilationOpContext cContext;
    protected List<Operator<? extends OperatorDesc>> childOperators;
    protected List<Operator<? extends OperatorDesc>> parentOperators;
    protected String operatorId;
    protected final AtomicBoolean abortOp;
    private transient ExecMapperContext execContext;
    private transient boolean rootInitializeCalled = false;
    protected transient long runTimeNumRows;
    protected int indexForTezUnion = -1;
    private transient Configuration hconf;
    protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();

    // It can be optimized later so that an operator operator (init/close) is performed
    // only after that operation has been performed on all the parents. This will require
    // initializing the whole tree in all the mappers (which might be required for mappers
    // spanning multiple files anyway, in future)

    /**
     * State.
     */
    public static enum State {
        UNINIT, // initialize() has not been called
        INIT, // initialize() has been called and close() has not been called,
        // or close() has been called but one of its parent is not closed.
        CLOSE
        // all its parents operators are in state CLOSE and called close()
        // to children. Note: close() being called and its state being CLOSE is
        // difference since close() could be called but state is not CLOSE if
        // one of its parent is not in state CLOSE..
    }

    protected transient State state = State.UNINIT;

    private boolean useBucketizedHiveInputFormat;

    // dummy operator (for not increasing seqId)
    protected Operator(String name, CompilationOpContext cContext) {
        this();
        this.cContext = cContext;
        this.id = name;
        initOperatorId();
    }

    protected Operator() {
        childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
        parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
        abortOp = new AtomicBoolean(false);
    }

    public Operator(CompilationOpContext cContext) {
        this(String.valueOf(cContext.nextOperatorId()), cContext);
    }

    public void setChildOperators(
            List<Operator<? extends OperatorDesc>> childOperators) {
        if (childOperators == null) {
            childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
        }
        this.childOperators = childOperators;
    }

    public Configuration getConfiguration() {
        return configuration;
    }

    public List<Operator<? extends OperatorDesc>> getChildOperators() {
        return childOperators;
    }

    public int getNumChild() {
        return childOperators == null ? 0 : childOperators.size();
    }

    /**
     * Implements the getChildren function for the Node Interface.
     */
    @Override
    public ArrayList<Node> getChildren() {

        if (getChildOperators() == null) {
            return null;
        }

        ArrayList<Node> ret_vec = new ArrayList<Node>();
        for (Operator<? extends OperatorDesc> op : getChildOperators()) {
            ret_vec.add(op);
        }

        return ret_vec;
    }

    public void setParentOperators(
            List<Operator<? extends OperatorDesc>> parentOperators) {
        if (parentOperators == null) {
            parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
        }
        this.parentOperators = parentOperators;
    }

    public List<Operator<? extends OperatorDesc>> getParentOperators() {
        return parentOperators;
    }

    public int getNumParent() {
        return parentOperators == null ? 0 : parentOperators.size();
    }

    protected T conf;
    protected boolean done;

    public void setConf(T conf) {
        this.conf = conf;
    }

    @Explain
    public T getConf() {
        return conf;
    }

    public boolean getDone() {
        return done;
    }

    protected final void setDone(boolean done) {
        this.done = done;
    }

    // non-bean fields needed during compilation
    private RowSchema rowSchema;

    public void setSchema(RowSchema rowSchema) {
        this.rowSchema = rowSchema;
    }

    public RowSchema getSchema() {
        return rowSchema;
    }

    // non-bean ..

    protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
    @SuppressWarnings("rawtypes")
    protected transient OutputCollector out;
    protected transient final Logger LOG = LoggerFactory.getLogger(getClass().getName());
    protected transient final Logger PLOG = LoggerFactory.getLogger(Operator.class.getName()); // for simple
    // disabling logs from all operators
    protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled();
    protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled();
    protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
    protected transient String alias;
    protected transient Reporter reporter;
    protected String id;
    // object inspectors for input rows
    // We will increase the size of the array on demand
    protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1];
    // for output rows of this operator
    protected transient ObjectInspector outputObjInspector;

    /**
     * A map of output column name to input expression map. This is used by
     * optimizer and built during semantic analysis contains only key elements for
     * reduce sink and group by op
     */
    protected Map<String, ExprNodeDesc> colExprMap;

    public void setId(String id) {
        this.id = id;
    }

    /**
     * This function is not named getId(), to make sure java serialization does
     * NOT serialize it. Some TestParse tests will fail if we serialize this
     * field, since the Operator ID will change based on the number of query
     * tests.
     */
    public String getIdentifier() {
        return id;
    }

    public void setReporter(Reporter rep) {
        reporter = rep;

        for (Operator<? extends OperatorDesc> op : childOperators) {
            op.setReporter(rep);
        }
    }

    @SuppressWarnings("rawtypes")
    public void setOutputCollector(OutputCollector out) {
        this.out = out;

        for (Operator<? extends OperatorDesc> op : childOperators) {
            op.setOutputCollector(out);
        }
    }

    /**
     * Store the alias this operator is working on behalf of.
     */
    public void setAlias(String alias) {
        this.alias = alias;

        for (Operator<? extends OperatorDesc> op : childOperators) {
            op.setAlias(alias);
        }
    }

    public Map<String, Long> getStats() {
        HashMap<String, Long> ret = new HashMap<String, Long>();
        for (String one : statsMap.keySet()) {
            ret.put(one, Long.valueOf(statsMap.get(one).get()));
        }
        return (ret);
    }

    /**
     * checks whether all parent operators are initialized or not.
     *
     * @return true if there are no parents or all parents are initialized. false
     * otherwise
     */
    protected boolean areAllParentsInitialized() {
        for (Operator<? extends OperatorDesc> parent : parentOperators) {
            if (parent == null) {
                //return true;
                continue;
            }
            if (parent.state != State.INIT) {
                return false;
            }
        }
        return true;
    }

    /**
     * Initializes operators only if all parents have been initialized. Calls
     * operator specific initializer which then initializes child ops.
     *
     * @param hconf
     * @param inputOIs input object inspector array indexes by tag id. null value is
     *                 ignored.
     * @throws HiveException
     */
    @SuppressWarnings("unchecked")
    public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException {
        this.done = false;
        if (state == State.INIT) {
            return;
        }

        this.configuration = hconf;
        if (!areAllParentsInitialized()) {
            return;
        }

        if (isLogInfoEnabled) {
            LOG.info("Initializing operator " + this);
        }

        if (inputOIs != null) {
            inputObjInspectors = inputOIs;
        }

        // initialize structure to maintain child op info. operator tree changes
        // while initializing so this need to be done here instead of constructor
        childOperatorsArray = new Operator[childOperators.size()];
        for (int i = 0; i < childOperatorsArray.length; i++) {
            childOperatorsArray[i] = childOperators.get(i);
        }
        childOperatorsTag = new int[childOperatorsArray.length];
        for (int i = 0; i < childOperatorsArray.length; i++) {
            List<Operator<? extends OperatorDesc>> parentOperators = childOperatorsArray[i].getParentOperators();
            childOperatorsTag[i] = parentOperators.indexOf(this);
            if (childOperatorsTag[i] == -1) {
                throw new HiveException("Hive internal error: cannot find parent in the child operator!");
            }
        }

        if (inputObjInspectors.length == 0) {
            throw new HiveException("Internal Error during operator initialization.");
        }

        // derived classes can set this to different object if needed
        outputObjInspector = inputObjInspectors[0];

        boolean isInitOk = false;
        try {
            initializeOp(hconf);
            // sanity checks
            if (!rootInitializeCalled || childOperatorsArray.length != childOperators.size()) {
                throw new AssertionError("Internal error during operator initialization");
            }
            if (isLogDebugEnabled) {
                LOG.debug("Initialization Done " + id + " " + getName());
            }

            initializeChildren(hconf);
            isInitOk = true;
        } finally {
            // TODO: ugly hack because Java doesn't have dtors and Tez input hangs on shutdown.
            if (!isInitOk) {
                cancelAsyncInitOps();
            }
        }

        if (isLogDebugEnabled) {
            LOG.debug("Initialization Done " + id + " " + getName() + " done is reset.");
        }

        // let's wait on the async ops before continuing
        completeInitialization(asyncInitOperations);
    }

    private void cancelAsyncInitOps() {
        for (Future<?> f : asyncInitOperations) {
            f.cancel(true);
        }
        asyncInitOperations.clear();
    }

    private void completeInitialization(Collection<Future<?>> fs) throws HiveException {
        Object[] os = new Object[fs.size()];
        int i = 0;
        Throwable asyncEx = null;

        // Wait for all futures to complete. Check for an abort while waiting for each future. If any of the
        // futures is cancelled / aborted - cancel all subsequent futures.

        boolean cancelAll = false;
        for (Future<?> f : fs) {
            // If aborted - break out of the loop, and cancel all subsequent futures.
            if (cancelAll) {
                break;
            }
            if (abortOp.get()) {
                cancelAll = true;
                break;
            } else {
                // Wait for the current future.
                while (true) {
                    if (abortOp.get()) {
                        cancelAll = true;
                        break;
                    } else {
                        try {
                            // Await future result with a timeout to check the abort field occasionally.
                            // It's possible that the interrupt which comes in along with an abort, is suppressed
                            // by some other operator.
                            Object futureResult = f.get(200l, TimeUnit.MILLISECONDS);
                            os[i++] = futureResult;
                            break;
                        } catch (TimeoutException e) {
                            // Expected if the operation takes time. Continue the loop, and wait for op completion.
                        } catch (InterruptedException | CancellationException e) {
                            asyncEx = e;
                            cancelAll = true;
                            break;
                        } catch (ExecutionException e) {
                            if (e.getCause() == null) {
                                asyncEx = e;
                            } else {
                                asyncEx = e.getCause();
                            }
                            cancelAll = true;
                            break;
                        }
                    }
                }

            }
        }

        if (cancelAll || asyncEx != null) {
            for (Future<?> f : fs) {
                // It's ok to send a cancel to an already completed future. Is a no-op
                f.cancel(true);
            }
            throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(), asyncEx);
        }

        completeInitializationOp(os);
    }

    /**
     * This method can be used to retrieve the results from async operations
     * started at init time - before the operator pipeline is started.
     *
     * @param os
     * @throws HiveException
     */
    protected void completeInitializationOp(Object[] os) throws HiveException {
        // no-op default
    }

    public void initializeLocalWork(Configuration hconf) throws HiveException {
        if (childOperators != null) {
            for (int i = 0; i < childOperators.size(); i++) {
                Operator<? extends OperatorDesc> childOp = this.childOperators.get(i);
                childOp.initializeLocalWork(hconf);
            }
        }
    }

    /**
     * Operator specific initialization.
     */
    protected void initializeOp(Configuration hconf) throws HiveException {
        this.hconf = hconf;
        rootInitializeCalled = true;
        runTimeNumRows = 0;
    }

    /**
     * Calls initialize on each of the children with outputObjetInspector as the
     * output row format.
     */
    protected void initializeChildren(Configuration hconf) throws HiveException {
        state = State.INIT;
        if (isLogDebugEnabled) {
            LOG.debug("Operator " + id + " " + getName() + " initialized");
        }
        if (childOperators == null || childOperators.isEmpty()) {
            return;
        }
        if (isLogDebugEnabled) {
            LOG.debug("Initializing children of " + id + " " + getName());
        }
        for (int i = 0; i < childOperatorsArray.length; i++) {
            childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
            if (reporter != null) {
                childOperatorsArray[i].setReporter(reporter);
            }
        }
    }

    public void abort() {
        LOG.info("Received abort in operator: {}", getName());
        abortOp.set(true);
    }

    /**
     * Pass the execContext reference to every child operator
     */
    public void passExecContext(ExecMapperContext execContext) {
        this.setExecContext(execContext);
        for (int i = 0; i < childOperators.size(); i++) {
            childOperators.get(i).passExecContext(execContext);
        }
    }

    /**
     * Collects all the parent's output object inspectors and calls actual
     * initialization method.
     *
     * @param hconf
     * @param inputOI  OI of the row that this parent will pass to this op
     * @param parentId parent operator id
     * @throws HiveException
     */
    protected void initialize(
            Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException {
        if (isLogDebugEnabled) {
            LOG.debug("Initializing child " + id + " " + getName());
        }
        // Double the size of the array if needed
        if (parentId >= inputObjInspectors.length) {
            int newLength = inputObjInspectors.length * 2;
            while (parentId >= newLength) {
                newLength *= 2;
            }
            inputObjInspectors = Arrays.copyOf(inputObjInspectors, newLength);
        }
        inputObjInspectors[parentId] = inputOI;
        // call the actual operator initialization function
        initialize(hconf, null);
    }

    public ObjectInspector[] getInputObjInspectors() {
        return inputObjInspectors;
    }

    public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) {
        this.inputObjInspectors = inputObjInspectors;
    }

    public ObjectInspector getOutputObjInspector() {
        return outputObjInspector;
    }

    /**
     * Process the row.
     *
     * @param row The object representing the row.
     * @param tag The tag of the row usually means which parent this row comes from.
     *            Rows with the same tag should have exactly the same rowInspector
     *            all the time.
     */
    public abstract void process(Object row, int tag) throws HiveException;

    protected final void defaultStartGroup() throws HiveException {
        if (isLogDebugEnabled) {
            LOG.debug("Starting group");
        }

        if (childOperators == null) {
            return;
        }

        if (isLogDebugEnabled) {
            LOG.debug("Starting group for children:");
        }
        for (Operator<? extends OperatorDesc> op : childOperators) {
            op.startGroup();
        }

        if (isLogDebugEnabled) {
            LOG.debug("Start group Done");
        }
    }

    protected final void defaultEndGroup() throws HiveException {
        if (isLogDebugEnabled) {
            LOG.debug("Ending group");
        }

        if (childOperators == null) {
            return;
        }

        if (isLogDebugEnabled) {
            LOG.debug("Ending group for children:");
        }
        for (Operator<? extends OperatorDesc> op : childOperators) {
            op.endGroup();
        }

        if (isLogDebugEnabled) {
            LOG.debug("End group Done");
        }
    }

    // If a operator wants to do some work at the beginning of a group
    public void startGroup() throws HiveException {
        defaultStartGroup();
    }

    // If an operator wants to do some work at the end of a group
    public void endGroup() throws HiveException {
        defaultEndGroup();
    }

    // an blocking operator (e.g. GroupByOperator and JoinOperator) can
    // override this method to forward its outputs
    public void flush() throws HiveException {
    }

    public void processGroup(int tag) throws HiveException {
        if (childOperators == null || childOperators.isEmpty()) {
            return;
        }
        for (int i = 0; i < childOperatorsArray.length; i++) {
            childOperatorsArray[i].processGroup(childOperatorsTag[i]);
        }
    }

    protected boolean allInitializedParentsAreClosed() {
        if (parentOperators != null) {
            for (Operator<? extends OperatorDesc> parent : parentOperators) {
                if (parent == null) {
                    continue;
                }
                if (isLogDebugEnabled) {
                    LOG.debug("allInitializedParentsAreClosed? parent.state = " + parent.state);
                }
                if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
                    return false;
                }
            }
        }
        return true;
    }

    // This close() function does not need to be synchronized
    // since it is called by its parents' main thread, so no
    // more than 1 thread should call this close() function.
    public void close(boolean abort) throws HiveException {
        if (isLogDebugEnabled) {
            LOG.debug("close called for operator " + this);
        }

        if (state == State.CLOSE) {
            return;
        }

        // check if all parents are finished
        if (!allInitializedParentsAreClosed()) {
            if (isLogDebugEnabled) {
                LOG.debug("Not all parent operators are closed. Not closing.");
            }
            return;
        }

        // set state as CLOSE as long as all parents are closed
        // state == CLOSE doesn't mean all children are also in state CLOSE
        state = State.CLOSE;
        if (isLogInfoEnabled) {
            LOG.info("Closing operator " + this);
        }

        abort |= abortOp.get();

        // call the operator specific close routine
        closeOp(abort);

        reporter = null;

        try {
            logStats();
            if (childOperators == null) {
                return;
            }

            for (Operator<? extends OperatorDesc> op : childOperators) {
                if (isLogDebugEnabled) {
                    LOG.debug("Closing child = " + op);
                }
                op.close(abort);
            }

            if (isLogDebugEnabled) {
                LOG.debug(id + " Close done");
            }
        } catch (HiveException e) {
            LOG.warn("Caught exception while closing operator: " + e.getMessage(), e);
            throw e;
        }
    }

    /**
     * Operator specific close routine. Operators which inherents this class
     * should overwrite this funtion for their specific cleanup routine.
     */
    protected void closeOp(boolean abort) throws HiveException {
        if (conf != null && conf.getRuntimeStatsTmpDir() != null) {
            publishRunTimeStats();
        }
        runTimeNumRows = 0;
    }

    private boolean jobCloseDone = false;

    // Operator specific logic goes here
    public void jobCloseOp(Configuration conf, boolean success) throws HiveException {
    }

    /**
     * Unlike other operator interfaces which are called from map or reduce task,
     * jobClose is called from the jobclient side once the job has completed.
     *
     * @param conf    Configuration with with which job was submitted
     * @param success whether the job was completed successfully or not
     */
    public void jobClose(Configuration conf, boolean success) throws HiveException {
        // JobClose has already been performed on this operator
        if (jobCloseDone) {
            return;
        }

        jobCloseOp(conf, success);
        jobCloseDone = true;

        if (childOperators != null) {
            for (Operator<? extends OperatorDesc> op : childOperators) {
                op.jobClose(conf, success);
            }
        }
    }

    /**
     * Cache childOperators in an array for faster access. childOperatorsArray is
     * accessed per row, so it's important to make the access efficient.
     */
    protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
    protected transient int[] childOperatorsTag;

    /**
     * Replace one child with another at the same position. The parent of the
     * child is not changed
     *
     * @param child    the old child
     * @param newChild the new child
     */
    public void replaceChild(
            Operator<? extends OperatorDesc> child, Operator<? extends OperatorDesc> newChild) {
        int childIndex = childOperators.indexOf(child);
        assert childIndex != -1;
        childOperators.set(childIndex, newChild);
    }

    public void removeChild(Operator<? extends OperatorDesc> child) {
        int childIndex = childOperators.indexOf(child);
        assert childIndex != -1;
        if (childOperators.size() == 1) {
            setChildOperators(null);
        } else {
            childOperators.remove(childIndex);
        }

        int parentIndex = child.getParentOperators().indexOf(this);
        assert parentIndex != -1;
        if (child.getParentOperators().size() == 1) {
            child.setParentOperators(null);
        } else {
            child.getParentOperators().remove(parentIndex);
        }
    }

    /**
     * Remove a child and add all of the child's children to the location of the child
     *
     * @param child If this operator is not the only parent of the child. There can be unpredictable result.
     * @throws SemanticException
     */
    public void removeChildAndAdoptItsChildren(
            Operator<? extends OperatorDesc> child) throws SemanticException {
        int childIndex = childOperators.indexOf(child);
        if (childIndex == -1) {
            throw new SemanticException("Exception when trying to remove partition predicates: fail to find " +
                    "child from parent");
        }

        childOperators.remove(childIndex);
        if (child.getChildOperators() != null && child.getChildOperators().size() > 0) {
            childOperators.addAll(childIndex, child.getChildOperators());
        }

        for (Operator<? extends OperatorDesc> gc : child.getChildOperators()) {
            List<Operator<? extends OperatorDesc>> parents = gc.getParentOperators();
            int index = parents.indexOf(child);
            if (index == -1) {
                throw new SemanticException("Exception when trying to remove partition predicates: fail to find "
                        + "parent from child");
            }
            parents.set(index, this);
        }
    }

    public void removeParent(Operator<? extends OperatorDesc> parent) {
        int parentIndex = parentOperators.indexOf(parent);
        assert parentIndex != -1;
        if (parentOperators.size() == 1) {
            setParentOperators(null);
        } else {
            parentOperators.remove(parentIndex);
        }

        int childIndex = parent.getChildOperators().indexOf(this);
        assert childIndex != -1;
        if (parent.getChildOperators().size() == 1) {
            parent.setChildOperators(null);
        } else {
            parent.getChildOperators().remove(childIndex);
        }
    }

    /**
     * Replace one parent with another at the same position. Chilren of the new
     * parent are not updated
     *
     * @param parent    the old parent
     * @param newParent the new parent
     */
    public void replaceParent(
            Operator<? extends OperatorDesc> parent, Operator<? extends OperatorDesc> newParent) {
        int parentIndex = parentOperators.indexOf(parent);
        assert parentIndex != -1;
        parentOperators.set(parentIndex, newParent);
    }

    protected long getNextCntr(long cntr) {
        // A very simple counter to keep track of number of rows processed by an
        // operator. It dumps
        // every 1 million times, and quickly before that
        if (cntr >= 1000000) {
            return cntr + 1000000;
        }

        return 10 * cntr;
    }

    protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
        runTimeNumRows++;
        if (getDone()) {
            return;
        }

        int childrenDone = 0;
        for (int i = 0; i < childOperatorsArray.length; i++) {
            Operator<? extends OperatorDesc> o = childOperatorsArray[i];
            if (o.getDone()) {
                childrenDone++;
            } else {
                o.process(row, childOperatorsTag[i]);
            }
        }

        // if all children are done, this operator is also done
        if (childrenDone != 0 && childrenDone == childOperatorsArray.length) {
            setDone(true);
        }
    }

    public void resetStats() {
        for (String e : statsMap.keySet()) {
            statsMap.get(e).set(0L);
        }
    }

    public void reset() {
        this.state = State.INIT;
        if (childOperators != null) {
            for (Operator<? extends OperatorDesc> o : childOperators) {
                o.reset();
            }
        }

    }

    /**
     * OperatorFunc.
     */
    public static interface OperatorFunc {
        void func(Operator<? extends OperatorDesc> op);
    }

    public void preorderMap(OperatorFunc opFunc) {
        opFunc.func(this);
        if (childOperators != null) {
            for (Operator<? extends OperatorDesc> o : childOperators) {
                o.preorderMap(opFunc);
            }
        }
    }

    public void logStats() {
        if (isLogInfoEnabled && !statsMap.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, LongWritable> e : statsMap.entrySet()) {
                sb.append(e.getKey()).append(":").append(e.getValue()).append(", ");
            }
            LOG.info(sb.toString());
        }
    }

    @Override
    public abstract String getName();

    static public String getOperatorName() {
        return "OP";
    }

    /**
     * Returns a map of output column name to input expression map Note that
     * currently it returns only key columns for ReduceSink and GroupBy operators.
     *
     * @return null if the operator doesn't change columns
     */
    public Map<String, ExprNodeDesc> getColumnExprMap() {
        return colExprMap;
    }

    public void setColumnExprMap(Map<String, ExprNodeDesc> colExprMap) {
        this.colExprMap = colExprMap;
    }

    private String getLevelString(int level) {
        if (level == 0) {
            return "\n";
        }
        StringBuilder s = new StringBuilder();
        s.append("\n");
        while (level > 0) {
            s.append("  ");
            level--;
        }
        return s.toString();
    }

    public String dump(int level) {
        return dump(level, new HashSet<Integer>());
    }

    public String dump(int level, HashSet<Integer> seenOpts) {
        if (seenOpts.contains(new Integer(id))) {
            return null;
        }
        seenOpts.add(new Integer(id));

        StringBuilder s = new StringBuilder();
        String ls = getLevelString(level);
        s.append(ls);
        s.append("<" + getName() + ">");
        s.append("Id =" + id);

        if (childOperators != null) {
            s.append(ls);
            s.append("  <Children>");
            for (Operator<? extends OperatorDesc> o : childOperators) {
                s.append(o.dump(level + 2, seenOpts));
            }
            s.append(ls);
            s.append("  <\\Children>");
        }

        if (parentOperators != null) {
            s.append(ls);
            s.append("  <Parent>");
            for (Operator<? extends OperatorDesc> o : parentOperators) {
                s.append("Id = " + o.id + " ");
                s.append(o.dump(level, seenOpts));
            }
            s.append("<\\Parent>");
        }

        s.append(ls);
        s.append("<\\" + getName() + ">");
        return s.toString();
    }

    /**
     * Initialize an array of ExprNodeEvaluator and return the result
     * ObjectInspectors.
     */
    protected static ObjectInspector[] initEvaluators(
            ExprNodeEvaluator<?>[] evals, ObjectInspector rowInspector) throws HiveException {
        ObjectInspector[] result = new ObjectInspector[evals.length];
        for (int i = 0; i < evals.length; i++) {
            result[i] = evals[i].initialize(rowInspector);
        }
        return result;
    }

    /**
     * Initialize an array of ExprNodeEvaluator from start, for specified length
     * and return the result ObjectInspectors.
     */
    protected static ObjectInspector[] initEvaluators(
            ExprNodeEvaluator<?>[] evals, int start, int length, ObjectInspector rowInspector) throws HiveException {
        ObjectInspector[] result = new ObjectInspector[length];
        for (int i = 0; i < length; i++) {
            result[i] = evals[start + i].initialize(rowInspector);
        }
        return result;
    }

    /**
     * Initialize an array of ExprNodeEvaluator and put the return values into a
     * StructObjectInspector with integer field names.
     */
    protected static StructObjectInspector initEvaluatorsAndReturnStruct(
            ExprNodeEvaluator<?>[] evals, List<String> outputColName, ObjectInspector rowInspector) throws HiveException {
        ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, rowInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(outputColName,
                Arrays.asList(fieldObjectInspectors));
    }

    protected transient Object groupKeyObject;

    public String getOperatorId() {
        return operatorId;
    }

    public void initOperatorId() {
        setOperatorId(getName() + "_" + this.id);
    }

    public void setOperatorId(String operatorId) {
        this.operatorId = operatorId;
    }

    /*
     * By default, the list is empty - if an operator wants to add more counters,
     * it should override this method and provide the new list. Counter names returned
     * by this method should be wrapped counter names (i.e the strings should be passed
     * through getWrappedCounterName).
     */
    protected List<String> getAdditionalCounters() {
        return null;
    }

    /**
     * Return the type of the specific operator among the
     * types in OperatorType.
     *
     * @return OperatorType.*
     */
    abstract public OperatorType getType();

    public void setGroupKeyObject(Object keyObject) {
        this.groupKeyObject = keyObject;
    }

    public Object getGroupKeyObject() {
        return groupKeyObject;
    }

    /**
     * Called during semantic analysis as operators are being added
     * in order to give them a chance to compute any additional plan information
     * needed.  Does nothing by default.
     */
    public void augmentPlan() {
    }

    public ExecMapperContext getExecContext() {
        return execContext;
    }

    public void setExecContext(ExecMapperContext execContext) {
        this.execContext = execContext;
    }

    // The input file has changed - every operator can invoke specific action
    // for each input file
    public void cleanUpInputFileChanged() throws HiveException {
        this.cleanUpInputFileChangedOp();
        if (this.childOperators != null) {
            for (int i = 0; i < this.childOperators.size(); i++) {
                Operator<? extends OperatorDesc> op = this.childOperators.get(i);
                op.cleanUpInputFileChanged();
            }
        }
    }

    // If a operator needs to invoke specific cleanup, that operator can override
    // this method
    public void cleanUpInputFileChangedOp() throws HiveException {
    }

    // called by map operator. propagated recursively to single parented descendants
    public void setInputContext(String tableName, String partitionName) {
        if (childOperators != null) {
            for (Operator<? extends OperatorDesc> child : childOperators) {
                if (child.getNumParent() == 1) {
                    child.setInputContext(tableName, partitionName);
                }
            }
        }
    }

    public boolean supportSkewJoinOptimization() {
        return false;
    }

    @Override
    public Operator<? extends OperatorDesc> clone() throws CloneNotSupportedException {

        List<Operator<? extends OperatorDesc>> parents = getParentOperators();
        List<Operator<? extends OperatorDesc>> parentClones = new ArrayList<Operator<? extends OperatorDesc>>();

        if (parents != null) {
            for (Operator<? extends OperatorDesc> parent : parents) {
                parentClones.add((parent.clone()));
            }
        }

        @SuppressWarnings("unchecked") T descClone = (T) conf.clone();
        // also clone the colExprMap by default
        Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild(cContext, descClone, getSchema(),
                getColumnExprMap(), parentClones);

        return ret;
    }

    /**
     * Clones only the operator. The children and parent are set
     * to null.
     *
     * @return Cloned operator
     * @throws CloneNotSupportedException
     */
    @SuppressWarnings("unchecked")
    public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
        T descClone = (T) conf.clone();
        Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild(cContext, descClone, getSchema());
        return ret;
    }

    /**
     * Recursively clones all the children of the tree,
     * Fixes the pointers to children, parents and the pointers to itself coming from the children.
     * It does not fix the pointers to itself coming from parents, parents continue to point to
     * the original child.
     *
     * @return Cloned operator
     * @throws CloneNotSupportedException
     */
    public Operator<? extends OperatorDesc> cloneRecursiveChildren() throws CloneNotSupportedException {
        Operator<? extends OperatorDesc> newOp = this.cloneOp();
        newOp.setParentOperators(this.parentOperators);
        List<Operator<? extends OperatorDesc>> newChildren = new ArrayList<Operator<? extends OperatorDesc>>();

        for (Operator<? extends OperatorDesc> childOp : this.getChildOperators()) {
            List<Operator<? extends OperatorDesc>> parentList = new ArrayList<Operator<? extends OperatorDesc>>();
            for (Operator<? extends OperatorDesc> parent : childOp.getParentOperators()) {
                if (parent.equals(this)) {
                    parentList.add(newOp);
                } else {
                    parentList.add(parent);
                }
            }
            // Recursively clone the children
            Operator<? extends OperatorDesc> clonedChildOp = childOp.cloneRecursiveChildren();
            clonedChildOp.setParentOperators(parentList);
        }

        newOp.setChildOperators(newChildren);
        return newOp;
    }

    /*
     * True only for operators which produce atmost 1 output row per input
     * row to it. This will allow the output column names to be directly
     * translated to input column names.
     */
    public boolean columnNamesRowResolvedCanBeObtained() {
        return false;
    }

    public boolean isUseBucketizedHiveInputFormat() {
        return useBucketizedHiveInputFormat;
    }

    /**
     * Before setting this to {@code true} make sure it's not reading ACID tables
     *
     * @param useBucketizedHiveInputFormat
     */
    public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
        this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
    }

    /**
     * Whether this operator supports automatic sort merge join.
     * The stack is traversed, and this method is invoked for all the operators.
     *
     * @return TRUE if yes, FALSE otherwise.
     */
    public boolean supportAutomaticSortMergeJoin() {
        return false;
    }

    public boolean supportUnionRemoveOptimization() {
        return false;
    }

    /*
     * This operator is allowed before mapjoin. Eventually, mapjoin hint should be done away with.
     * But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
     * Check the operators which are allowed before mapjoin.
     */
    public boolean opAllowedBeforeMapJoin() {
        return true;
    }

    /*
     * This operator is allowed after mapjoin. Eventually, mapjoin hint should be done away with.
     * But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
     * Check the operators which are allowed after mapjoin.
     */
    public boolean opAllowedAfterMapJoin() {
        return true;
    }

    /*
     * If this task contains a join, it can be converted to a map-join task if this operator is
     * present in the mapper. For eg. if a sort-merge join operator is present followed by a regular
     * join, it cannot be converted to a auto map-join.
     */
    public boolean opAllowedConvertMapJoin() {
        return true;
    }

    /*
     * If this task contains a sortmergejoin, it can be converted to a map-join task if this operator
     * is present in the mapper. For eg. if a sort-merge join operator is present followed by a
     * regular join, it cannot be converted to a auto map-join.
     */
    public boolean opAllowedBeforeSortMergeJoin() {
        return true;
    }

    /**
     * used for LimitPushdownOptimizer
     *
     * if all of the operators between limit and reduce-sink does not remove any input rows
     * in the range of limit count, limit can be pushed down to reduce-sink operator.
     * forward, select, etc.
     */
    public boolean acceptLimitPushdown() {
        return false;
    }

    @Override
    public String toString() {
        return getName() + "[" + getIdentifier() + "]";
    }

    public static String toString(Collection<TableScanOperator> top) {
        StringBuilder builder = new StringBuilder();
        Set<String> visited = new HashSet<String>();
        for (Operator<?> op : top) {
            if (builder.length() > 0) {
                builder.append('\n');
            }
            toString(builder, visited, op, 0);
        }
        return builder.toString();
    }

    static boolean toString(StringBuilder builder, Set<String> visited, Operator<?> op, int start) {
        String name = op.toString();
        boolean added = visited.add(name);
        if (start > 0) {
            builder.append("-");
            start++;
        }
        builder.append(name);
        start += name.length();
        if (added) {
            if (op.getNumChild() > 0) {
                List<Operator<?>> children = op.getChildOperators();
                for (int i = 0; i < children.size(); i++) {
                    if (i > 0) {
                        builder.append('\n');
                        for (int j = 0; j < start; j++) {
                            builder.append(' ');
                        }
                    }
                    toString(builder, visited, children.get(i), start);
                }
            }
            return true;
        }
        return false;
    }

    public Statistics getStatistics() {
        if (conf != null) {
            return conf.getStatistics();
        }

        return null;
    }

    public OpTraits getOpTraits() {
        if (conf != null) {
            return conf.getTraits();
        }

        return null;
    }

    public void setOpTraits(OpTraits metaInfo) {
        if (isLogDebugEnabled) {
            LOG.debug("Setting traits (" + metaInfo + ") on " + this);
        }
        if (conf != null) {
            conf.setTraits(metaInfo);
        } else {
            LOG.warn("Cannot set traits when there's no descriptor: " + this);
        }
    }

    public void setStatistics(Statistics stats) {
        if (isLogDebugEnabled) {
            LOG.debug("Setting stats (" + stats + ") on " + this);
        }
        if (conf != null) {
            conf.setStatistics(stats);
        } else {
            LOG.warn("Cannot set stats when there's no descriptor: " + this);
        }
    }

    @SuppressWarnings("rawtypes")
    public static Operator createDummy() {
        return new DummyOperator();
    }

    @SuppressWarnings({"serial", "unchecked", "rawtypes"})
    private static class DummyOperator extends Operator {
        public DummyOperator() {
            super("dummy", null);
        }

        @Override
        public void process(Object row, int tag) {
        }

        @Override
        public OperatorType getType() {
            return null;
        }

        @Override
        public String getName() {
            return DummyOperator.getOperatorName();
        }

        public static String getOperatorName() {
            return "DUMMY";
        }

        @Override
        protected void initializeOp(Configuration conf) {
        }
    }

    public void removeParents() {
        for (Operator<?> parent : new ArrayList<Operator<?>>(getParentOperators())) {
            removeParent(parent);
        }
    }

    public boolean getIsReduceSink() {
        return false;
    }

    public String getReduceOutputName() {
        return null;
    }

    public void setCompilationOpContext(CompilationOpContext ctx) {
        cContext = ctx;
    }

    /**
     * @return Compilation operator context. Only available during compilation.
     */
    public CompilationOpContext getCompilationOpContext() {
        return cContext;
    }

    private void publishRunTimeStats() throws HiveException {
        StatsPublisher statsPublisher = new FSStatsPublisher();
        StatsCollectionContext sContext = new StatsCollectionContext(hconf);
        sContext.setIndexForTezUnion(indexForTezUnion);
        sContext.setStatsTmpDir(conf.getRuntimeStatsTmpDir());

        if (!statsPublisher.connect(sContext)) {
            LOG.error("StatsPublishing error: cannot connect to database");
            throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
        }

        String prefix = "";
        Map<String, String> statsToPublish = new HashMap<String, String>();
        statsToPublish.put(StatsSetupConst.RUN_TIME_ROW_COUNT, Long.toString(runTimeNumRows));
        if (!statsPublisher.publishStat(prefix, statsToPublish)) {
            // The original exception is lost.
            // Not changing the interface to maintain backward compatibility
            throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
        }
        if (!statsPublisher.closeConnection(sContext)) {
            // The original exception is lost.
            // Not changing the interface to maintain backward compatibility
            throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
        }
    }

    public int getIndexForTezUnion() {
        return indexForTezUnion;
    }

    public void setIndexForTezUnion(int indexForTezUnion) {
        this.indexForTezUnion = indexForTezUnion;
    }
}
